{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Advanced Update Operations\n", "\n", "ParquetDB’s `update` method allows you to modify existing records in your dataset by matching on one or more “update keys.” Typically, the `id` field is used to identify which records to update, but you can also specify additional or different keys based on your data schema.\n", "\n", "\n", "\n", "```python\n", "def update(\n", " self,\n", " data: Union[list, dict, pd.DataFrame],\n", " schema: pa.Schema = None,\n", " metadata: dict = None,\n", " fields_metadata: dict = None,\n", " update_keys: Union[list, str] = [\"id\"],\n", " treat_fields_as_ragged=None,\n", " convert_to_fixed_shape: bool = True,\n", " normalize_config: NormalizeConfig = NormalizeConfig(),\n", "):\n", " \"\"\"\n", " Updates existing records in the database.\n", "\n", " Parameters\n", " ----------\n", " data : dict, list of dicts, or pandas.DataFrame\n", " The data to update. Each record must contain an 'id' (or specified update key) \n", " corresponding to the record(s) to update.\n", " schema : pyarrow.Schema, optional\n", " The schema for the data being updated. If not provided, it is inferred.\n", " metadata : dict, optional\n", " Additional metadata for the entire dataset.\n", " fields_metadata : dict, optional\n", " Additional metadata for each field/column.\n", " update_keys : list of str or str, optional\n", " Which columns to use for matching existing records. Default is 'id'.\n", " treat_fields_as_ragged : list of str, optional\n", " A list of fields to treat as ragged arrays.\n", " convert_to_fixed_shape : bool, optional\n", " If True, convert ragged arrays to a fixed shape if possible.\n", " normalize_config : NormalizeConfig, optional\n", " Configuration for the normalization process, optimizing performance\n", " by managing row distribution and file structure.\n", "\n", " Example\n", " -------\n", " >>> db.update(\n", " ... data=[{\"id\": 1, \"name\": \"John\", \"age\": 30}, {\"id\": 2, \"name\": \"Jane\", \"age\": 25}]\n", " ... )\n", " \"\"\"\n", " ...\n", "---\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "============================================================\n", "PARQUETDB SUMMARY\n", "============================================================\n", "Database path: ParquetDB\n", "\n", "• Number of columns: 5\n", "• Number of rows: 4\n", "• Number of files: 1\n", "• Number of rows per file: [4]\n", "• Number of row groups per file: [1]\n", "• Serialized metadata size per file: [1101] Bytes\n", "\n", "############################################################\n", "METADATA\n", "############################################################\n", "\n", "############################################################\n", "COLUMN DETAILS\n", "############################################################\n", "• Columns:\n", " - id\n", " - nested.a\n", " - name\n", " - nested.b\n", " - age\n", "\n", " age id name nested.a nested.b\n", "0 30 0 John 1 2\n", "1 25 1 Jane 3 4\n", "2 30 2 Jimmy 1 2\n", "3 35 3 Jill 3 4\n" ] } ], "source": [ "import pprint\n", "import shutil\n", "import os\n", "from pathlib import Path\n", "import pandas as pd\n", "import pyarrow as pa\n", "from parquetdb import ParquetDB, NormalizeConfig\n", "\n", "ROOT_DIR = Path(\".\")\n", "DATA_DIR = ROOT_DIR / \"data\"\n", "\n", "if DATA_DIR.exists():\n", " shutil.rmtree(DATA_DIR)\n", " \n", "db_path = ROOT_DIR / \"ParquetDB\"\n", "\n", "db = ParquetDB(db_path)\n", "data = [\n", " {\"name\": \"John\", \"age\": 30, \"nested\": {\"a\": 1, \"b\": 2}},\n", " {\"name\": \"Jane\", \"age\": 25, \"nested\": {\"a\": 3, \"b\": 4}},\n", " {\"name\": \"Jimmy\", \"age\": 30, \"nested\": {\"a\": 1, \"b\": 2}},\n", " {\"name\": \"Jill\", \"age\": 35, \"nested\": {\"a\": 3, \"b\": 4}},\n", "]\n", "\n", "db.create(data)\n", "print(db)\n", "\n", "df = db.read().to_pandas()\n", "print(df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Basic Usage\n", "\n", "Here’s how to call `update` on a ParquetDB instance. We’ll assume the dataset is already populated with some records. When data is inputed into ParquetDB, it is assigned a unique id for which each record can be matched and updated. From the above data the index starts at 0 and goes to 3 sequentially through the list of dictionaries.\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " age id name nested.a nested.b\n", "0 30 0 John 1 2\n", "1 31 1 Jane 3 4\n", "2 26 2 Jimmy 1 2\n", "3 35 3 Jill 3 4\n" ] } ], "source": [ "update_data = [\n", " {\"id\": 1, \"age\": 31},\n", " {\"id\": 2, \"age\": 26},\n", "]\n", "\n", "db.update(update_data)\n", "\n", "df = db.read().to_pandas()\n", "print(df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As you can see the data has been updated with the new values. You can also specify nested dictionaries and it will update the corresponding nested values." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " age id name nested.a nested.b\n", "0 30 0 John 100 2\n", "1 31 1 Jane 3 4\n", "2 26 2 Jimmy 1 2\n", "3 35 3 Jill 200 4\n" ] } ], "source": [ "update_data = [\n", " {\"id\": 0, \"nested\": {\"a\": 100}},\n", " {\"id\": 3, \"nested\": {\"a\": 200}},\n", "]\n", "\n", "db.update(update_data)\n", "\n", "df = db.read().to_pandas()\n", "print(df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Update on multiple keys\n", "\n", "You can also update on select which keys to update on and can even update on multiple keys." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "============================================================\n", "PARQUETDB SUMMARY\n", "============================================================\n", "Database path: ParquetDB\n", "\n", "• Number of columns: 4\n", "• Number of rows: 5\n", "• Number of files: 1\n", "• Number of rows per file: [5]\n", "• Number of row groups per file: [1]\n", "• Serialized metadata size per file: [907] Bytes\n", "\n", "############################################################\n", "METADATA\n", "############################################################\n", "\n", "############################################################\n", "COLUMN DETAILS\n", "############################################################\n", "• Columns:\n", " - field_1\n", " - id_1\n", " - id\n", " - id_2\n", "\n", " field_1 id id_1 id_2\n", "0 here 0 100 10\n", "1 None 1 55 11\n", "2 None 2 33 12\n", "3 None 3 12 13\n", "4 None 4 33 50\n" ] } ], "source": [ "db_path = \"ParquetDB\"\n", "if os.path.exists(db_path):\n", " shutil.rmtree(db_path)\n", "\n", "db = ParquetDB(db_path)\n", "current_data = [\n", " {\"id_1\": 100, \"id_2\": 10, \"field_1\": \"here\"},\n", " {\"id_1\": 55, \"id_2\": 11},\n", " {\"id_1\": 33, \"id_2\": 12},\n", " {\"id_1\": 12, \"id_2\": 13},\n", " {\"id_1\": 33, \"id_2\": 50},\n", "]\n", "\n", "db.create(current_data)\n", "print(db)\n", "\n", "df = db.read().to_pandas()\n", "print(df)" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " field_1 field_2 field_3 id id_1 id_2\n", "0 here there None 0 100 10\n", "1 None None None 1 55 11\n", "2 None field_2 field_3 2 33 12\n", "3 None None None 3 12 13\n", "4 None None None 4 33 50\n" ] } ], "source": [ "incoming_data = [\n", " {\"id_1\": 100, \"id_2\": 10, \"field_2\": \"there\"},\n", " {\"id_1\": 5, \"id_2\": 5},\n", " {\n", " \"id_1\": 33,\n", " \"id_2\": 13,\n", " }, # Note: emp_id 4 doesn't exist in employees. So no update will be applied\n", " {\n", " \"id_1\": 33,\n", " \"id_2\": 12,\n", " \"field_2\": \"field_2\",\n", " \"field_3\": \"field_3\",\n", " },\n", "]\n", "\n", "\n", "db.update(incoming_data, update_keys=[\"id_1\", \"id_2\"])\n", "\n", "table = db.read()\n", "print(table.to_pandas())\n", "assert table[\"field_1\"].combine_chunks().to_pylist() == [\"here\", None, None, None, None]\n", "assert table[\"field_2\"].combine_chunks().to_pylist() == [\n", " \"there\",\n", " None,\n", " \"field_2\",\n", " None,\n", " None,\n", "]\n", "assert table[\"field_3\"].combine_chunks().to_pylist() == [\n", " None,\n", " None,\n", " \"field_3\",\n", " None,\n", " None,\n", "]" ] } ], "metadata": { "kernelspec": { "display_name": "parquetdb_dev", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.20" } }, "nbformat": 4, "nbformat_minor": 2 }